package cn.doitedu.ml.tfidf

import cn.doitedu.commons.util.SparkUtil
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.Row

/**
  * @date: 2020/2/22
  * @site: www.doitedu.cn
  * @author: hunter.d 涛哥
  * @qq: 657270652
  * @description:用来对给定文档，进行tfidf向量化计算
  * TF ： 是一个词在一篇文章中出现的次数
  * IDF：是一个词在样本空间中的常见程度（越少见，这个词的特别含义越大） ： lg(文档总数/(含有该词的文档数+1))
  *
  * TF-IDF = TF * IDF  (作为一个文档向量中，词的特征值)
  */
object TFIDF_RDD {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.WARN)

    val spark = SparkUtil.getSparkSession(this.getClass.getSimpleName)
    import spark.implicits._

    /**
      * docid,doc
      * 1,a a a a a a x x y
      * 2,b b b x y
      * 3,c c x y
      * 4,d x
      */

    val df = spark.read.option("header", "true").csv("userprofile/data/demo/tfidf")

    // 1. 将文档用hash映射向量化，并用词的TF作为特征值
    val tfrdd = df.rdd.map({
      case Row(docid: String, doc: String)
      => {
        // 造一个全0的长度为6的特征值数组 <0,0,0,0,0,0>
        val arr = Array.fill(26)(0.0)

        // a a a a a a x x y = >  Map{a->6,x->2,y->1}  scala版wordcount
        val wc: Map[String, Int] = doc.split(" ").map((_, 1)).groupBy(_._1).mapValues(_.size)

        for ((w, c) <- wc) {
          arr((w.hashCode & Integer.MAX_VALUE) % 26) = c
        }
        (docid, arr)
      }
    })

    tfrdd.toDF("docid", "arr").show(100, false)
    /**
      * +-----+----------------------------------------------+
      * |docid|arr                                           |
      * +-----+----------------------------------------------+
      * |1    |[0.0,....., 2.0, 1.0, 0.0, 6.0, 0.0, 0.0, 0.0,|
      * |2    |[0.0,....., 1.0, 1.0, 0.0, 0.0, 3.0, 0.0, 0.0,|
      * |3    |[0.0,....., 1.0, 1.0, 0.0, 0.0, 0.0, 2.0, 0.0,|
      * |4    |[0.0,....., 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0,|
      * +-----+----------------------------------------------+
      *
      * =>  IDF:
      * [0.0,....., 0.0, 0.02,0.0, 1.02, ........ ]
      **/

    // 2. 计算每个词出现的文档数
    val res: Array[Double] = tfrdd
      // 将每一个TF特征值数组做变换： 非0值统一变成1 ，0就是0
      .map(_._2.map(x => if (x != 0) 1.0 else 0.0))
      // 将两条特征值数组，对应脚标的值累加到一起
      .reduce((arr1, arr2) => arr1.zip(arr2).map(tp => tp._1 + tp._2))
    //.fold(Array.fill(26)(0.0))((arr1,arr2)=>{arr1.zip(arr2).map(tp=>tp._1+tp._2)})


    // res ==>  4.0,3.0,0.0,1.0,1.0,1.0,1.0,0.0,0.0,0.0
    println(res.mkString(","))

    // 3. 计算每个词的IDF ：   lg（文档总数/包含该词的文档数)
    val docTotal: Long = tfrdd.count()
    // 注意，本结果idfArr是一个Driver端的普通本地数组
    val idfArr: Array[Double] = res.map(n => Math.log10(docTotal / (n + 0.01)))

    println(idfArr.mkString(","))

    // 将idf值数组，广播出去
    val bc = spark.sparkContext.broadcast(idfArr)

    // 生成最终的结果： 每篇文章的TF-IDF特征向量
    tfrdd.map(tp => {
      val idf = bc.value // 从广播变量中取到idf值数组
      val tfArr = tp._2 // 取到一行tf数组

      val tfidfArr = tfArr.zip(idfArr).map(tp => tp._1 * tp._2)
      (tp._1, tfidfArr)
    }).toDF("docid", "tfidf")
      .show(100, false)


    spark.close()
  }

}
